import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";

# Spark Getting Started Guide

## Setup

<!--
SPDX-FileCopyrightText: 2023 LakeSoul Contributors

SPDX-License-Identifier: Apache-2.0
-->

## Setup

To use LakeSoul in Spark, first configure [Spark Catalog](01-setup-local-env.md). LakeSoul uses Apache Spark’s DataSourceV2 API for data source and catalog implementations. Moreover, LakeSoul provides scala table API to extend the capability of LakeSoul table.

### Spark 3 Support Matrix

LakeSoul | Spark Version
--- | ---
2.2.x-2.4.x |3.3.x
2.0.x-2.1.x| 3.1.x

### Spark Shell/SQL/PySpark

Run spark-shell/spark-sql/pyspark with the `LakeSoulSparkSessionExtension` sql extension.
<Tabs
    defaultValue="Spark SQL"
    values={[
        {label: 'Spark SQL', value: 'Spark SQL'},
        {label: 'Scala', value: 'Scala'},
        {label: 'PySpark', value: 'PySpark'},
    ]}>
  <TabItem value="Spark SQL" label="Spark SQL" default>

  ```bash
  spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-VAR::VERSION.jar
  ```

  </TabItem>

  <TabItem value="Scala" label="Scala">

  ```bash
  spark-shell --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-VAR::VERSION.jar
  ```
  </TabItem>
  <TabItem value="PySpark" label="PySpark">

  ```bash
  wget https://github.com/lakesoul-io/LakeSoul/tree/main/python/lakesoul/spark/tables.py
  pyspark --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-spark-3.3-VAR::VERSION.jar --py-files tables.py
  ```
  </TabItem>

</Tabs>

### Setup Maven Project

Include maven dependencies in your project:
```xml
<dependency>
    <groupId>com.dmetasoul</groupId>
    <artifactId>lakesoul-spark</artifactId>
    <version>3.3-VAR::VERSION</version>
</dependency>
```

<Tabs>
<TabItem value="Scala" label="Scala">

```scala
// Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.lakesoul.LakeSoulOptions
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable


val builder = SparkSession.builder()
    .master("local")
    .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
    .config("spark.sql.defaultCatalog", "lakesoul")
val spark = builder.getOrCreate()

```

</TabItem>

</Tabs>


## Create Namespace
First, create a namespace for LakeSoul table, default namespace of LakeSoul Catalog is `default`.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace;
USE lakesoul_namespace;
SHOW TABLES;
```
  </TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
```

  </TabItem>
  <TabItem value="PySpark" label="PySpark">

  ```python
  // python
  spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
  spark.sql("USE lakesoul_namespace")
  spark.sql("SHOW TABLES")
  ```
  </TabItem>
</Tabs>


## Create Table
Create a partitioned LakeSoul table using SQL with the clause `USING lakesoul`, or using `DataFrameWriterV2` API at the first `save`.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) 
USING lakesoul 
PARTITIONED BY (`date`) 
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table';
```

  </TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
```

  </TabItem>
  <TabItem value="PySpark" label="PySpark">

```python
// python
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
```
  </TabItem>
</Tabs>

### Primary Key Table
In LakeSoul, a table with primary keys is defined as a hash-partitioned table. To create such a table, use the `USING lakesoul` clause and specify the `TBLPROPERTIES` setting, where `'hashPartitions'` designates a comma-separated list of primary key column names, and `'hashBucketNum'` determines the size or number of hash buckets.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) 
USING lakesoul 
PARTITIONED BY (date) 
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' 
TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2');
```
  </TabItem>

  <TabItem value="Scala" label="Scala">


```scala
// Scala
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
```

  </TabItem>
  <TabItem value="PySpark" label="PySpark">

  ```python
  // python
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
  ```
  </TabItem>
</Tabs>

### CDC Table
Optionally, a hash-partitioned LakeSoul table has the capability to record Change Data Capture (CDC) data, enabling the tracking of data modifications. To create a LakeSoul table with CDC support, one can utilize the DDL statement for a hash-partitioned LakeSoul table and include an additional `TBLPROPERTIES` setting specifying the `'lakesoul_cdc_change_column'` attribute. This attribute introduces an implicit column that assists the table in efficiently handling CDC information, ensuring precise tracking and management of data changes.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) 
USING lakesoul 
PARTITIONED BY (date) 
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' 
TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op');
```
  </TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
```

  </TabItem>
  <TabItem value="PySpark" label="PySpark">

```python
// python
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
```
  </TabItem>
</Tabs>


## Insert/Merge Data

To append new data to a non-hash-partitioned table using Spark SQL, use INSERT INTO.

To append new data to a table using DataFrame, use `DataFrameWriterV2` API. If this is the first write of the table, it will also auto-create the corresponding LakeSoul table. 

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
INSERT INTO TABLE lakesoul_table VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-01'), (1, 'Cathy', '2024-01-02');
```

</TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala
val data = Seq(Row(1L, "Alice", "2024-01-01"), Row(2L, "Bob", "2024-01-01"), Row(1L, "Cathy", "2024-01-02"))
val schema = StructType(Seq(StructField("id", LongType, false), StructField("name", StringType, true), StructField("date", StringType, false)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.write.format("lakesoul").insertInto("lakesoul_table")

```
  </TabItem>
  <TabItem value="PySpark" label="PySpark">

```python
  // python
from pyspark.sql.types import *
data = [(1,"Cathy","2024-01-02")]
schema = StructType([StructField("id", LongType(), False), StructField("name", StringType(), True), StructField("date", StringType(), False)])
df = spark.createDataFrame(data,schema=schema)
df.write.format("lakesoul").insertInto("lakesoul_table")
```
  </TabItem>
</Tabs>

To append new data to a hash-partitioned table using Spark SQL, use Merge INTO.

To append new data to a hash-partitioned table using DataFrame, use `LakeSoulTable` upsert API.


<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , name, date)
AS SELECT 1L as `id`, 'George' as `name`, '2024-01-01' as `date`;


MERGE INTO lakesoul_hash_table AS t 
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
```

  </TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"

// Init hash table with first dataframe
val df = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
val writer = df.write.format("lakesoul").mode("overwrite")

writer
    .option("rangePartitions", "range")
    .option("hashPartitions", "hash")
    .option("hashBucketNum", 2)
    .save(tablePath)

// merge the second dataframe into hash table using LakeSoulTable upsert API
val dfUpsert = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
LakeSoulTable.forPath(tablePath).upsert(dfUpsert)

```
</TabItem>
 <TabItem value="PySpark" label="PySpark">

```python
  // python
from pyspark.sql.types import *
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
df = spark.createDataFrame([(20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)],schema='range string,hash string,value string')
df.write.format("lakesoul").mode("overwrite").option("rangePartitions", "range").option("hashPartitions", "hash").option("hashBucketNum", 2).save(tablePath)
dfUpsert = spark.createDataFrame([(20201111, 1, 1), (20201111, 2, 2), (20201111, 3, 3), (20201112, 4, 4)],schema='range string,hash string,value string')
LakeSoulTable.forPath(spark,tablePath).upsert(dfUpsert)
```
 </TabItem>
</Tabs>



## Update Data
LakeSoul tables can be updated by a DataFrame or using a standard `UPDATE` statement.
To update data to a table using DataFrame, use `LakeSoulTable` updateExpr API.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>


```sql
UPDATE lakesoul_table SET name = 'David' WHERE id = 2;
```
  </TabItem>

  <TabItem value="Scala" label="Scala">

  ```scala
  // Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).updateExpr("id = 2", Seq(("name"->"'David'")).toMap)


  ```
  </TabItem>
  <TabItem value="PySpark" label="PySpark">

  ```python
    // python
  from tables import LakeSoulTable
  tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
  LakeSoulTable.forPath(spark,tablePath).update("hash = 4", { "value":"5"})
  ```
   </TabItem>
</Tabs>


## Delete Data
LakeSoul tables can be removes the records by a DataFrame or using a standard `DELETE` statement.
To delete data to a table using DataFrame, use `LakeSoulTable` `delete` API.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
DELETE FROM lakesoul_table WHERE id =1;
```

  </TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).delete("id = 1 or id =2")
```

</TabItem>
  <TabItem value="PySpark" label="PySpark">

  ```python
    // python
  from tables import LakeSoulTable
  tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
  LakeSoulTable.forPath(spark,tablePath).delete("hash = 4")
  ```
  </TabItem>
</Tabs>

## Query Data

LakeSoul tables can be queried using a DataFrame or Spark SQL.

<Tabs>
  <TabItem value="Spark SQL" label="Spark SQL" default>

```sql
SELECT * FROM lakesoul_table;
```

  </TabItem>

  <TabItem value="Scala" label="Scala">

```scala
// Scala

// query data with DataFrameReader API
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
spark.read.format("lakesoul").load(tablePath)

// query data with LakeSoulTable API
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).toDF

val tableName = "lakesoul_table"
LakeSoulTable.forName(tableName).toDF
```

</TabItem>
  <TabItem value="PySpark" label="PySpark">

  ```python
    // python
  from tables import LakeSoulTable
  tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"

  // query data with LakeSoulTable API
  LakeSoulTable.forPath(spark,tablePath).toDF().show()

  // query data with DataFrameReader API
  spark.read.format("lakesoul").load(tablePath).show()
  ```
  </TabItem>
</Tabs>

## Time Travel Query
LakeSoul supports time travel query to query the table at any point-in-time in history or the changed data between two commit time. 

```scala
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
    .toDF("range", "hash", "op")
    .write
    .mode("append")
    .format("lakesoul")
    .option("rangePartitions", "range")
    .option("hashPartitions", "hash")
    .option("hashBucketNum", "2")
    .option("shortTableName", "cdc_table")
    .option("lakesoul_cdc_change_column", "op")
    .save(tablePath)
// record the version of 1st commit 
import java.text.SimpleDateFormat

val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit 
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits 
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


```

### Complete Query
```scala
// Scala
spark.sql("SELECT * FROM cdc_table")
```

### Snapshot Query
LakeSoul supports snapshot query for query the table at a point-in-time in history.
```scala
// Scala
spark.read.format("lakesoul")
    .option(LakeSoulOptions.PARTITION_DESC, "range=range2")
    .option(LakeSoulOptions.READ_END_TIME, versionB)
    .option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.SNAPSHOT_READ)
    .load(tablePath)
```

### Incremental Query
LakeSoul supports incremental query to obtain a set of records that changed between a start and end time.

```scala
// Scala
spark.read.format("lakesoul")
    .option(LakeSoulOptions.PARTITION_DESC, "range=range1")
    .option(LakeSoulOptions.READ_START_TIME, versionA)
    .option(LakeSoulOptions.READ_END_TIME, versionB)
    .option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.INCREMENTAL_READ)
    .load(tablePath)
```

## Next steps
Next, you can learn more usage cases about LakeSoul tables in Spark at [Spark API docs](../03-Usage%20Docs/03-spark-api-docs.md).
